View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.lang.annotation.Annotation;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.EnumMap;
28  import java.util.Iterator;
29  import java.util.LinkedHashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.ThreadFactory;
36  
37  import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
38  import org.apache.maven.surefire.report.ConsoleLogger;
39  import org.apache.maven.surefire.testset.TestSetFailedException;
40  import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
41  import org.junit.internal.runners.ErrorReportingRunner;
42  import org.junit.runner.Description;
43  import org.junit.runner.Runner;
44  import org.junit.runner.manipulation.Filter;
45  import org.junit.runner.manipulation.NoTestsRemainException;
46  import org.junit.runner.notification.RunNotifier;
47  import org.junit.runners.ParentRunner;
48  import org.junit.runners.Suite;
49  import org.junit.runners.model.InitializationError;
50  import org.junit.runners.model.RunnerBuilder;
51  
52  import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
53  import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
54  import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
55  import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
56  
57  /**
58   * Executing suites, classes and methods with defined concurrency. In this example the threads which completed
59   * the suites and classes can be reused in parallel methods.
60   * <pre>
61   * JUnitCoreParameters parameters = ...;
62   * ParallelComputerBuilder builder = new ParallelComputerBuilder(parameters);
63   * builder.useOnePool(8).parallelSuites(2).parallelClasses(4).parallelMethods();
64   * ParallelComputerBuilder.ParallelComputer computer = builder.buildComputer();
65   * Class<?>[] tests = {...};
66   * new JUnitCore().run(computer, tests);
67   * </pre>
68   * Note that the type has always at least one thread even if unspecified. The capacity in
69   * {@link ParallelComputerBuilder#useOnePool(int)} must be greater than the number of concurrent suites and classes
70   * altogether.
71   * <p/>
72   * The Computer can be stopped in a separate thread. Pending tests will be interrupted if the argument is
73   * <tt>true</tt>.
74   * <pre>
75   * computer.describeStopped(true);
76   * </pre>
77   *
78   * @author Tibor Digana (tibor17)
79   * @since 2.16
80   */
81  public final class ParallelComputerBuilder
82  {
83      private static final ThreadFactory DAEMON_THREAD_FACTORY = DaemonThreadFactory.newDaemonThreadFactory();
84  
85      private static final Class<? extends Annotation> JCIP_NOT_THREAD_SAFE = loadNotThreadSafeAnnotations();
86  
87      private static final Set<?> NULL_SINGLETON = Collections.singleton( null );
88  
89      static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
90  
91      private final Map<Type, Integer> parallelGroups = new EnumMap<Type, Integer>( Type.class );
92  
93      private final ConsoleLogger logger;
94  
95      private boolean useSeparatePools;
96  
97      private int totalPoolSize;
98  
99      private JUnitCoreParameters parameters;
100 
101     private boolean optimize;
102 
103     private boolean runningInTests;
104 
105     /**
106      * Calling {@link #useSeparatePools()}.
107      * Can be used only in unit tests.
108      * Do NOT call this constructor in production.
109      */
110     ParallelComputerBuilder( ConsoleLogger logger )
111     {
112         this.logger = logger;
113         runningInTests = true;
114         useSeparatePools();
115         parallelGroups.put( SUITES, 0 );
116         parallelGroups.put( CLASSES, 0 );
117         parallelGroups.put( METHODS, 0 );
118     }
119 
120     public ParallelComputerBuilder( ConsoleLogger logger, JUnitCoreParameters parameters )
121     {
122         this( logger );
123         runningInTests = false;
124         this.parameters = parameters;
125     }
126 
127     public ParallelComputer buildComputer()
128     {
129         return new PC();
130     }
131 
132     ParallelComputerBuilder useSeparatePools()
133     {
134         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
135         useSeparatePools = true;
136         return this;
137     }
138 
139     ParallelComputerBuilder useOnePool()
140     {
141         totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
142         useSeparatePools = false;
143         return this;
144     }
145 
146     /**
147      * @param totalPoolSize Pool size where suites, classes and methods are executed in parallel.
148      *                      If the <tt>totalPoolSize</tt> is {@link Integer#MAX_VALUE}, the pool capacity is not
149      *                      limited.
150      * @throws IllegalArgumentException If <tt>totalPoolSize</tt> is &lt; 1.
151      */
152     ParallelComputerBuilder useOnePool( int totalPoolSize )
153     {
154         if ( totalPoolSize < 1 )
155         {
156             throw new IllegalArgumentException( "Size of common pool is less than 1." );
157         }
158         this.totalPoolSize = totalPoolSize;
159         useSeparatePools = false;
160         return this;
161     }
162 
163     boolean isOptimized()
164     {
165         return optimize;
166     }
167 
168     ParallelComputerBuilder optimize( boolean optimize )
169     {
170         this.optimize = optimize;
171         return this;
172     }
173 
174     ParallelComputerBuilder parallelSuites()
175     {
176         return parallel( SUITES );
177     }
178 
179     ParallelComputerBuilder parallelSuites( int nThreads )
180     {
181         return parallel( nThreads, SUITES );
182     }
183 
184     ParallelComputerBuilder parallelClasses()
185     {
186         return parallel( CLASSES );
187     }
188 
189     ParallelComputerBuilder parallelClasses( int nThreads )
190     {
191         return parallel( nThreads, CLASSES );
192     }
193 
194     ParallelComputerBuilder parallelMethods()
195     {
196         return parallel( METHODS );
197     }
198 
199     ParallelComputerBuilder parallelMethods( int nThreads )
200     {
201         return parallel( nThreads, METHODS );
202     }
203 
204     private ParallelComputerBuilder parallel( int nThreads, Type parallelType )
205     {
206         if ( nThreads < 0 )
207         {
208             throw new IllegalArgumentException( "negative nThreads " + nThreads );
209         }
210 
211         if ( parallelType == null )
212         {
213             throw new NullPointerException( "null parallelType" );
214         }
215 
216         parallelGroups.put( parallelType, nThreads );
217         return this;
218     }
219 
220     private ParallelComputerBuilder parallel( Type parallelType )
221     {
222         return parallel( Integer.MAX_VALUE, parallelType );
223     }
224 
225     private double parallelTestsTimeoutInSeconds()
226     {
227         return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
228     }
229 
230     private double parallelTestsTimeoutForcedInSeconds()
231     {
232         return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
233     }
234 
235     @SuppressWarnings( "unchecked" )
236     private static Class<? extends Annotation> loadNotThreadSafeAnnotations()
237     {
238         try
239         {
240             Class c = Class.forName( "net.jcip.annotations.NotThreadSafe" );
241             return c.isAnnotation() ? (Class<? extends Annotation>) c : null;
242         }
243         catch ( ClassNotFoundException e )
244         {
245             return null;
246         }
247     }
248 
249     final class PC
250         extends ParallelComputer
251     {
252         private final SingleThreadScheduler notThreadSafeTests =
253             new SingleThreadScheduler( ParallelComputerBuilder.this.logger );
254 
255         final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
256 
257         final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
258 
259         final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
260 
261         final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
262 
263         final Collection<Runner> notParallelRunners = new LinkedHashSet<Runner>();
264 
265         int poolCapacity;
266 
267         boolean splitPool;
268 
269         private final Map<Type, Integer> allGroups;
270 
271         private long nestedClassesChildren;
272 
273         private volatile Scheduler master;
274 
275         private PC()
276         {
277             super( parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds() );
278             allGroups = new EnumMap<Type, Integer>( ParallelComputerBuilder.this.parallelGroups );
279             poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
280             splitPool = ParallelComputerBuilder.this.useSeparatePools;
281         }
282 
283         @Override
284         protected ShutdownResult describeStopped( boolean shutdownNow )
285         {
286             ShutdownResult shutdownResult = notThreadSafeTests.describeStopped( shutdownNow );
287             final Scheduler m = master;
288             if ( m != null )
289             {
290                 ShutdownResult shutdownResultOfMaster = m.describeStopped( shutdownNow );
291                 shutdownResult.getTriggeredTests().addAll( shutdownResultOfMaster.getTriggeredTests() );
292                 shutdownResult.getIncompleteTests().addAll( shutdownResultOfMaster.getIncompleteTests() );
293             }
294             return shutdownResult;
295         }
296 
297         @Override
298         boolean shutdownThreadPoolsAwaitingKilled()
299         {
300             boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
301             final Scheduler m = master;
302             if ( m != null )
303             {
304                 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
305             }
306             return notInterrupted;
307         }
308 
309         @Override
310         public Runner getSuite( RunnerBuilder builder, Class<?>[] cls )
311             throws InitializationError
312         {
313             try
314             {
315                 super.getSuite( builder, cls );
316                 populateChildrenFromSuites();
317 
318                 WrappedRunners suiteSuites = wrapRunners( suites );
319                 WrappedRunners suiteClasses = wrapRunners( classes );
320 
321                 long suitesCount = suites.size();
322                 long classesCount = classes.size() + nestedClasses.size();
323                 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
324                 if ( !ParallelComputerBuilder.this.runningInTests )
325                 {
326                     determineThreadCounts( suitesCount, classesCount, methodsCount );
327                 }
328 
329                 return setSchedulers( suiteSuites.wrappingSuite, suiteClasses.wrappingSuite );
330             }
331             catch ( TestSetFailedException e )
332             {
333                 throw new InitializationError( Collections.<Throwable>singletonList( e ) );
334             }
335         }
336 
337         @Override
338         protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
339             throws Throwable
340         {
341             Runner runner = super.getRunner( builder, testClass );
342             if ( canSchedule( runner ) )
343             {
344                 if ( !isThreadSafe( runner ) )
345                 {
346                     ( ( ParentRunner ) runner ).setScheduler( notThreadSafeTests.newRunnerScheduler() );
347                     notParallelRunners.add( runner );
348                 }
349                 else if ( runner instanceof Suite )
350                 {
351                     suites.add( (Suite) runner );
352                 }
353                 else
354                 {
355                     classes.add( (ParentRunner) runner );
356                 }
357             }
358             else
359             {
360                 notParallelRunners.add( runner );
361             }
362             return runner;
363         }
364 
365         private void determineThreadCounts( long suites, long classes, long methods )
366             throws TestSetFailedException
367         {
368             final JUnitCoreParameters parameters = ParallelComputerBuilder.this.parameters;
369             final boolean optimize = ParallelComputerBuilder.this.optimize;
370             RunnerCounter counts = new RunnerCounter( suites, classes, methods );
371             Concurrency concurrency = resolveConcurrency( parameters, optimize ? counts : null );
372             allGroups.put( SUITES, concurrency.suites );
373             allGroups.put( CLASSES, concurrency.classes );
374             allGroups.put( METHODS, concurrency.methods );
375             poolCapacity = concurrency.capacity;
376             splitPool &= concurrency.capacity <= 0; // fault if negative; should not happen
377         }
378 
379         private <T extends Runner> WrappedRunners wrapRunners( Collection<T> runners )
380             throws InitializationError
381         {
382             // Do NOT use allGroups here.
383             long childrenCounter = 0;
384             ArrayList<Runner> runs = new ArrayList<Runner>();
385             for ( T runner : runners )
386             {
387                 if ( runner != null )
388                 {
389                     int children = countChildren( runner );
390                     childrenCounter += children;
391                     if ( children != 0 )
392                     {
393                         runs.add( runner );
394                     }
395                 }
396             }
397             return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners( createSuite( runs ), childrenCounter );
398         }
399 
400         private int countChildren( Runner runner )
401         {
402             Description description = runner.getDescription();
403             Collection children = description == null ? null : description.getChildren();
404             return children == null ? 0 : children.size();
405         }
406 
407         private ExecutorService createPool( int poolSize )
408         {
409             return poolSize < Integer.MAX_VALUE
410                 ? Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY )
411                 : Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
412         }
413 
414         private Scheduler createMaster( ExecutorService pool, int poolSize )
415         {
416             final int finalRunnersCounter = countFinalRunners(); // can be 0, 1, 2 or 3
417             final SchedulingStrategy strategy;
418             if ( finalRunnersCounter <= 1 || poolSize <= 1 )
419             {
420                 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
421             }
422             else if ( pool != null && poolSize == Integer.MAX_VALUE )
423             {
424                 strategy = new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool );
425             }
426             else
427             {
428                 strategy = SchedulingStrategies.createParallelStrategy( ParallelComputerBuilder.this.logger,
429                                                                         finalRunnersCounter );
430             }
431             return new Scheduler( ParallelComputerBuilder.this.logger, null, strategy );
432         }
433 
434         private int countFinalRunners()
435         {
436             int counter = notParallelRunners.isEmpty() ? 0 : 1;
437 
438             if ( !suites.isEmpty() && allGroups.get( SUITES ) > 0 )
439             {
440                 ++counter;
441             }
442 
443             if ( !classes.isEmpty() && allGroups.get( CLASSES ) > 0 )
444             {
445                 ++counter;
446             }
447 
448             return counter;
449         }
450 
451         private void populateChildrenFromSuites()
452         {
453             // Do NOT use allGroups here.
454             Filter filter = new SuiteFilter();
455             for ( Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); )
456             {
457                 ParentRunner suite = it.next();
458                 try
459                 {
460                     suite.filter( filter );
461                 }
462                 catch ( NoTestsRemainException e )
463                 {
464                     it.remove();
465                 }
466             }
467         }
468 
469         private int totalPoolSize()
470         {
471             if ( poolCapacity == TOTAL_POOL_SIZE_UNDEFINED )
472             {
473                 int total = 0;
474                 for ( int nThreads : allGroups.values() )
475                 {
476                     total += nThreads;
477                     if ( total < 0 )
478                     {
479                         total = Integer.MAX_VALUE;
480                         break;
481                     }
482                 }
483                 return total;
484             }
485             else
486             {
487                 return poolCapacity;
488             }
489         }
490 
491         private Runner setSchedulers( ParentRunner suiteSuites, ParentRunner suiteClasses )
492             throws InitializationError
493         {
494             int parallelSuites = allGroups.get( SUITES );
495             int parallelClasses = allGroups.get( CLASSES );
496             int parallelMethods = allGroups.get( METHODS );
497             int poolSize = totalPoolSize();
498             ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool( poolSize );
499             master = createMaster( commonPool, poolSize );
500 
501             if ( suiteSuites != null )
502             {
503                 // a scheduler for parallel suites
504                 if ( commonPool != null && parallelSuites > 0 )
505                 {
506                     Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
507                     suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
508                 }
509                 else
510                 {
511                     suiteSuites.setScheduler( createScheduler( parallelSuites ) );
512                 }
513             }
514 
515             // schedulers for parallel classes
516             ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
517             allSuites.addAll( nestedSuites );
518             if ( suiteClasses != null )
519             {
520                 allSuites.add( suiteClasses );
521             }
522             if ( !allSuites.isEmpty() )
523             {
524                 setSchedulers( allSuites, parallelClasses, commonPool );
525             }
526 
527             // schedulers for parallel methods
528             ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
529             allClasses.addAll( nestedClasses );
530             if ( !allClasses.isEmpty() )
531             {
532                 setSchedulers( allClasses, parallelMethods, commonPool );
533             }
534 
535             // resulting runner for Computer#getSuite() scheduled by master scheduler
536             ParentRunner all = createFinalRunner( removeNullRunners(
537                 Arrays.<Runner>asList( suiteSuites, suiteClasses, createSuite( notParallelRunners ) )
538             ) );
539             all.setScheduler( master );
540             return all;
541         }
542 
543         private ParentRunner createFinalRunner( List<Runner> runners )
544             throws InitializationError
545         {
546             return new Suite( null, runners )
547             {
548                 @Override
549                 public void run( RunNotifier notifier )
550                 {
551                     try
552                     {
553                         beforeRunQuietly();
554                         super.run( notifier );
555                     }
556                     finally
557                     {
558                         afterRunQuietly();
559                     }
560                 }
561             };
562         }
563 
564         private void setSchedulers( Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool )
565         {
566             if ( commonPool != null )
567             {
568                 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness( poolSize );
569                 boolean doParallel = poolSize > 0;
570                 for ( ParentRunner runner : runners )
571                 {
572                     runner.setScheduler(
573                         createScheduler( runner.getDescription(), commonPool, doParallel, concurrencyLimit ) );
574                 }
575             }
576             else
577             {
578                 ExecutorService pool = null;
579                 if ( poolSize == Integer.MAX_VALUE )
580                 {
581                     pool = Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
582                 }
583                 else if ( poolSize > 0 )
584                 {
585                     pool = Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY );
586                 }
587                 boolean doParallel = pool != null;
588                 for ( ParentRunner runner : runners )
589                 {
590                     runner.setScheduler( createScheduler( runner.getDescription(), pool, doParallel,
591                                                           BalancerFactory.createInfinitePermitsBalancer() ) );
592                 }
593             }
594         }
595 
596         private Scheduler createScheduler( Description desc, ExecutorService pool, boolean doParallel,
597                                            Balancer concurrency )
598         {
599             doParallel &= pool != null;
600             SchedulingStrategy strategy = doParallel
601                     ? new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool )
602                     : new InvokerStrategy( ParallelComputerBuilder.this.logger );
603             return new Scheduler( ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency );
604         }
605 
606         private Scheduler createScheduler( int poolSize )
607         {
608             final SchedulingStrategy strategy;
609             if ( poolSize == Integer.MAX_VALUE )
610             {
611                 strategy = SchedulingStrategies.createParallelStrategyUnbounded( ParallelComputerBuilder.this.logger );
612             }
613             else if ( poolSize == 0 )
614             {
615                 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
616             }
617             else
618             {
619                 strategy = SchedulingStrategies.createParallelStrategy( ParallelComputerBuilder.this.logger, poolSize );
620             }
621             return new Scheduler( ParallelComputerBuilder.this.logger, null, master, strategy );
622         }
623 
624         private boolean canSchedule( Runner runner )
625         {
626             return !( runner instanceof ErrorReportingRunner ) && runner instanceof ParentRunner;
627         }
628 
629         private boolean isThreadSafe( Runner runner )
630         {
631             return runner.getDescription().getAnnotation( JCIP_NOT_THREAD_SAFE ) == null;
632         }
633 
634         private class SuiteFilter
635             extends Filter
636         {
637             // Do NOT use allGroups in SuiteFilter.
638 
639             @Override
640             public boolean shouldRun( Description description )
641             {
642                 return true;
643             }
644 
645             @Override
646             public void apply( Object child )
647                 throws NoTestsRemainException
648             {
649                 super.apply( child );
650                 if ( child instanceof ParentRunner )
651                 {
652                     ParentRunner runner = ( ParentRunner ) child;
653                     if ( !isThreadSafe( runner ) )
654                     {
655                         runner.setScheduler( notThreadSafeTests.newRunnerScheduler() );
656                     }
657                     else if ( child instanceof Suite )
658                     {
659                         nestedSuites.add( (Suite) child );
660                     }
661                     else
662                     {
663                         ParentRunner parentRunner = (ParentRunner) child;
664                         nestedClasses.add( parentRunner );
665                         nestedClassesChildren += parentRunner.getDescription().getChildren().size();
666                     }
667                 }
668             }
669 
670             @Override
671             public String describe()
672             {
673                 return "";
674             }
675         }
676     }
677 
678     private static Suite createSuite( Collection<Runner> runners )
679         throws InitializationError
680     {
681         final List<Runner> onlyRunners = removeNullRunners( runners );
682         return onlyRunners.isEmpty() ? null : new Suite( null, onlyRunners )
683         {
684         };
685     }
686 
687     private static List<Runner> removeNullRunners( Collection<Runner> runners )
688     {
689         final List<Runner> onlyRunners = new ArrayList<Runner>( runners );
690         onlyRunners.removeAll( NULL_SINGLETON );
691         return onlyRunners;
692     }
693 }